package com.supply.hsa.link.task.fetch.hotel;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.supply.hsa.link.common.constant.SupplierClass;
import com.supply.hsa.link.task.fetch.*;
import com.supply.hsa.link.task.fetch.TaskSwitch;
import com.supply.hsa.link.task.fetch.hotel.analyze.AnalyzeHotelTask;
import com.supply.hsa.link.task.fetch.hotel.container.TaskHandlerContainer;
import com.supply.hsa.link.task.fetch.hotel.dto.CommonSyncResponse;
import com.supply.hsa.link.task.fetch.enums.ResponseCodeEnum;
import com.supply.hsa.link.task.fetch.server.CommonSupplyFetchService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;


/**
 * 远程获取产品信息任务 执行器
 *
 */
@Service
@Slf4j
public class FetchHotelTaskExecutor implements TaskExecutor<FetchHotelTask>, ApplicationListener<ContextRefreshedEvent>{
	
	@Autowired
	private CommonSupplyFetchService commonSupplyFetchService;
	
	@Autowired
	private TaskHandlerContainer taskHandlerContainer;
	
	@SuppressWarnings("rawtypes")
	@Autowired
	@Qualifier("analyzeHotelTaskExecutor")
	private TaskExecutor<AnalyzeHotelTask> taskExecutor;
	
	//FIFO 执行任务
	private Map<SupplierClass, LinkedBlockingQueue<FetchHotelTask>> taskQueue;
	
	//等待执行的任务队列，存储暂停的定时任务
	private Map<SupplierClass, LinkedBlockingQueue<FetchHotelTask>> watingTaskQueue;
	
	private final ReentrantLock countLock = new ReentrantLock();
	
	@Autowired
	private TaskSwitch taskSwitch;
	
	//消费者线程数量
	private int consumerThreadNum = 2;
		
	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if(event.getApplicationContext().getDisplayName().equals(CommonConstant.SPRING_APPLICATION_CONTEXT_NAME)){
			Set<SupplierClass> supplierClasseSet = this.taskHandlerContainer.getRegisteredSupplier();
			if(CollectionUtils.isNotEmpty(supplierClasseSet)){
				taskQueue = new HashMap<SupplierClass, LinkedBlockingQueue<FetchHotelTask>>(supplierClasseSet.size());
				watingTaskQueue = new HashMap<SupplierClass, LinkedBlockingQueue<FetchHotelTask>>(supplierClasseSet.size());
				
				for(SupplierClass supplierClass : supplierClasseSet){
					taskQueue.put(supplierClass, new LinkedBlockingQueue<FetchHotelTask>());
					watingTaskQueue.put(supplierClass, new LinkedBlockingQueue<FetchHotelTask>());
					
					//开启的线程数不易过多， 设置为2已经可以最大程度的发挥 抓取酒店每天数据 的并发能力(线程池线程数是90)
					for(int i=1; i<=consumerThreadNum; i++){
						FetchRemoteTaskConsumer consumer = new FetchRemoteTaskConsumer(supplierClass);
						consumer.setName(supplierClass.getSupplierClass()+"-FetchRemoteTaskConsumer"+i);
						consumer.setUncaughtExceptionHandler(new UncaughtExceptionHandler(){
							@Override
							public void uncaughtException(Thread t, Throwable e) {
								log.error("并发线程异常",t, e);
							}
						});
						consumer.start();
					}
					
					WaitingTaskConsumer waitingTaskConsumer = new WaitingTaskConsumer(supplierClass);
					waitingTaskConsumer.setName(supplierClass.getSupplierClass()+"-WaitingTaskConsumer");
					waitingTaskConsumer.setUncaughtExceptionHandler(new UncaughtExceptionHandler(){
						@Override
						public void uncaughtException(Thread t, Throwable e) {
							log.error("WaitingTaskConsumer exception!",t, e);
						}
					});
					waitingTaskConsumer.start();
				}
			}
		}
	}
	
	private class FetchRemoteTaskConsumer extends Thread{
		private SupplierClass supplierClass;
		FetchRemoteTaskConsumer(SupplierClass supplierClass){
			this.supplierClass = supplierClass;
		}
		@Override
		public void run() {
			while (true) {
				try {
					//移除并获取队列头部任务
					FetchHotelTask fetchRemoteTask = taskQueue.get(supplierClass).take();

					WorkerStatus timerTaskStatus = taskSwitch.getTimerTaskStatus(supplierClass);
					if(timerTaskStatus!=null) {
						switch (timerTaskStatus) {
							case PAUSING:
								//定时器暂停时将任务加入到暂停队列
								watingTaskQueue.get(fetchRemoteTask.getSupplierClass()).put(fetchRemoteTask);
								break;
							case STOPPED:
								//完成任务
								execute(fetchRemoteTask);
								break;
							default:
								//非暂停、停止状态，则执行任务
								execute(fetchRemoteTask);
								break;
						}
					}
				} catch (InterruptedException e) {
					log.error("FetchRemoteTaskConsumer error",e);
				}
			}
		}
	}
	
	private class WaitingTaskConsumer extends Thread{
		private SupplierClass supplierClass;
		WaitingTaskConsumer(SupplierClass supplierClass){
			this.supplierClass = supplierClass;
		}
		@Override
		public void run(){
			while(true){
				try {
					WorkerStatus timerTaskStatus = taskSwitch.getTimerTaskStatus(this.supplierClass);
					if(timerTaskStatus!=null){
						if(WorkerStatus.RUNNING.equals(timerTaskStatus)){
							FetchHotelTask fetchRemoteTask = watingTaskQueue.get(supplierClass).take();
							add(fetchRemoteTask);
						}else if(WorkerStatus.STOPPED.equals(timerTaskStatus)){
							FetchHotelTask fetchRemoteTask = watingTaskQueue.get(supplierClass).take();
							
							execute(fetchRemoteTask);
						}else{
							TimeUnit.SECONDS.sleep(10);
						}
					}else{
						TimeUnit.SECONDS.sleep(10);
					}
				} catch (InterruptedException e) {
					log.error("WaitingTaskConsumer exception!",e);
				}
			}
		}
	}
	
	private boolean validTask(FetchHotelTask task){
		if(task == null 
				|| task.getSupplierClass() == null
				|| task.getRequest() == null){
			return false;
		}
		
		return true;
	}
	
	@Override
	public void add(FetchHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		try {
			//任务加入时间
			task.setJoinDate(new Date());
			//添加任务到队列尾部
			taskQueue.get(task.getSupplierClass()).put(task);
			
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			log.error("添加同步任务异常，供应商：{}",task.getSupplierClass(),e);
		}
	}
	
	@SuppressWarnings("rawtypes")
	@Override
	public void execute(FetchHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		//初始化任务
		task.initTask();
		//任务类型
		task.setTaskName(CommonConstant.TaskType.FETCH_HOTEL_TASK);
		
		//注册任务
		TaskManager.getInstance().register(task);
				
		WorkerStatus timerTaskStatus = taskSwitch.getTimerTaskStatus(task.getSupplierClass());
		boolean enableExecute = WorkerStatus.RUNNING.equals(timerTaskStatus);
		
		if(enableExecute){
			CommonSyncResponse response = null;
			try {
				//抓取供应商数据
				response = this.commonSupplyFetchService.fetchHotelProductInfo(task);
				
				if(ResponseCodeEnum.FetchParamError.resultCode.equals(response.getResultCode())){
					task.setResult(false);
					task.setFailReason(task.getSupplierClass() + "," + ResponseCodeEnum.FetchParamError.message + "," + task.getRequest());
					task.setEnableLog(true);
					this.finish(task);
					
					return;
				}
				
				if(CollectionUtils.isEmpty(response.getResultList())){
					task.setResult(true);
					task.setEnableLog(true);
					this.finish(task);
					
					return;
				}
			} catch (Exception e) {
				log.error(task.getSupplierClass()+"抓取供应商数据异常.", e);
				
				task.setResult(false);
				task.setFailReason(e.getMessage());
				task.setEnableLog(true);
				this.finish(task);
				
				return;
			}
			
			//异步实时消费抓取的数据
			AnalyzeHotelTask analyzeHotelTask = new AnalyzeHotelTask();
			analyzeHotelTask.setParentTaskId(task.getTaskId());//解析任务作为抓取任务的子任务，解析任务执行完，才表示完整执行完一个流程
			analyzeHotelTask.setSupplierClass(task.getSupplierClass());
			analyzeHotelTask.setFetchRequest(task.getRequest());
			analyzeHotelTask.setFetchResponse(response);
			
			taskExecutor.add(analyzeHotelTask);
		}else{
			this.finish(task);
		}
	}

	@Override
	public void stop(FetchHotelTask task) {
	}
	
	@Override
	public void pause(FetchHotelTask task) {
	}

	@Override
	public void resume(FetchHotelTask task) {
	}

	@Override
	public void finish(FetchHotelTask task) {
		if(!this.validTask(task)){
			return;
		}
		
		task.setWorkStatus(WorkerStatus.FINISH);
		task.setEndDate(new Date());
		
		TaskManager taskManager = TaskManager.getInstance();
		
		taskManager.unregister(task);
	}

	@SuppressWarnings("rawtypes")
	public void increment(FetchHotelTask task, Task childTask) {
		if(!this.validTask(task)){
			return;
		}
		
		if(childTask!=null){
			if(childTask instanceof AnalyzeHotelTask){
				//数据解析入库完，表示一个酒店的抓取流程处理完毕
				AnalyzeHotelTask analyzeHotelTask = (AnalyzeHotelTask)childTask;
				task.getTaskResult().setAnalyzeHotelTaskResult(analyzeHotelTask.getTaskResult());
				
				task.setResult(true);
				task.setEnableLog(true);
				
				this.finish(task);
			}
//			else if(childTask instanceof FetchHotelDayTask){
//				final ReentrantLock countLock = this.countLock;
//				countLock.lock();
//				try{
//					FetchHotelDayTask fetchHotelDayTask = (FetchHotelDayTask)childTask;
//
//					//任务总数
//					task.getTaskResult().setTaskCount(task.getTaskResult().getTaskCount()+1);
//					//成功的任务总数
//					if(childTask.isResult()){
//						task.getTaskResult().setTaskSuccessCount(task.getTaskResult().getTaskSuccessCount()+1);
//					}else{
//						task.getTaskResult().setTaskFailureCount(task.getTaskResult().getTaskFailureCount()+1);
//					}
//
//					//记录抓取失败的数据
//					if(!fetchHotelDayTask.isResult()){
//						if(SupplyFetchErrorEnum.HTTP_EXCEPTION == fetchHotelDayTask.getTaskResult().getError()){
//							task.getTaskResult().setFetchHttpFailureCount(task.getTaskResult().getFetchHttpFailureCount() + 1);
//							task.getTaskResult().getFetchHttpFailureList().add(fetchHotelDayTask.getTaskResult());
//						}
//					}
//				}finally{
//					countLock.unlock();
//				}
//			}
		}
	}

	public long count(FetchHotelTask task) {
		if(!this.validTask(task)){
			return 0;
		}
		
		return 0;
	}

}
